package org.ricks.event;

import org.ricks.actor.Actor;
import org.ricks.actor.ActorMgr;
import org.ricks.actor.ActorRunnable;
import org.ricks.log.Logger;
import org.ricks.utils.CollUtil;
import org.ricks.utils.RandomUtil;
import java.util.*;
import java.util.concurrent.*;

/**
 * @author chenwei
 * @Title:
 * @Package
 * @Description:
 * @date 2022/3/314:50
 */
public abstract class EventBus {

    private static boolean isAsy = false;
    /**
     * 线程池的大小. event的线程池比较大
     */
    public static final int EXECUTORS_SIZE = Runtime.getRuntime().availableProcessors() * 2;

    private static final Map<Long, Actor> threadMap = new ConcurrentHashMap<>();

    private static final Map<Class<? extends IEvent>, List<IEventReceiver>> receiverMap = new HashMap<>(); //事件任务

    private static List<SubscriberMethod> subscriberMethods = new ArrayList<>(); //事件函数

    private static Actor[] actors = ActorMgr.get().createActors(EXECUTORS_SIZE, "event"); //创建事件actor



    /**
     * 同步抛出一个事件，会在当前线程中运行
     *
     * @param event 需要抛出的事件
     */
    public static void syncSubmit(IEvent event) {
        var list = receiverMap.get(event.getClass());
        if (CollUtil.isEmpty(list)) {
            return;
        }
        doSubmit(event, list);
    }

    /**
     * 执行方法调用
     *
     * @param event        事件
     * @param receiverList 所有的观察者
     */
    private static void doSubmit(IEvent event, List<IEventReceiver> receiverList) {
        for (var receiver : receiverList) {
            try {
                receiver.invoke(event);
            } catch (Exception e) {
                Logger.error("eventBus未知exception异常", e);
            } catch (Throwable t) {
                Logger.error("eventBus未知error异常", t);
            }
        }
    }

    /**
     * 异步抛出一个事件，事件不在同一个线程中处理
     *
     * @param event 需要抛出的事件
     */
    public static void asyncSubmit(IEvent event) {
        var list = receiverMap.get(event.getClass());
        if (CollUtil.isEmpty(list)) {
            return;
        }

        actors[Math.abs(event.threadId() % EXECUTORS_SIZE)].execute(new ActorRunnable("async-event",() -> doSubmit(event, list)));
    }

    /**
     * 异步抛出一个事件，事件不在同一个线程中处理
     *
     * @param event 需要抛出的事件
     */
    public static void actorSubmit(IEvent event,Actor actor) {
        var list = receiverMap.get(event.getClass());
        if (CollUtil.isEmpty(list)) {
            return;
        }

        actor.execute(new ActorRunnable("async-event",() -> doSubmit(event, list)));
    }

    public static void asyncExecute(Runnable runnable) {
        execute(RandomUtil.randomInt(), runnable);
    }

    /**
     * 用指定线程执行
     *
     * @param hashcode
     * @return
     */
    public static void execute(int hashcode, Runnable runnable) {
        actors[Math.abs(hashcode % EXECUTORS_SIZE)].execute(new ActorRunnable("event",runnable));
    }

    /**
     * 注册事件及其对应观察者
     */
    public static void registerEventReceiver(Class<? extends IEvent> eventType, IEventReceiver receiver) {
        receiverMap.computeIfAbsent(eventType, it -> new LinkedList<>()).add(receiver);
    }

    public static Actor threadExecutor(long currentThreadId) {
        return threadMap.get(currentThreadId);
    }

}
